package rmq

import (
	"encoding/json"
	"gitee.com/masaichi/mlib/rmq/rmqconstant"
	"gitee.com/masaichi/mlib/rmq/rmqlib"
	"gitee.com/masaichi/mlib/rmq/rmqstruct"
	"github.com/streadway/amqp"
	"log"
	"reflect"
)

//队列消费者
func CommonQueueConsume(callbackConfig map[string]interface{}) {
	mq := rmqlib.NewMQ()
	mq.Consume(rmqconstant.COMMON_QUEUE, rmqconstant.COMMON_ROUTING_KEY, commonCallback, callbackConfig)
}

//延迟队列消费者
func CommonDelayQueueConsume(callbackConfig map[string]interface{}) {
	mq := rmqlib.NewMQ()
	mq.Consume(rmqconstant.COMMON_DELAY_QUEUE, rmqconstant.COMMON_DELAY_ROUTING_KEY, commonDelayCallback, callbackConfig)
}

func CommonFanoutQueueConsume(callbackConfig map[string]interface{}) {
	mq := rmqlib.NewMQ()
	//定义队列并且绑定交换机
	mq.Consume(rmqconstant.COMMON_FANOUT_QUEUE, "", commonDelayCallback, callbackConfig)
}

func commonCallback(msgs <-chan amqp.Delivery, str string, config map[string]interface{}) {
	for msg := range msgs {
		paramItem := rmqstruct.NewEmptyQueueParamItem()
		err := json.Unmarshal(msg.Body, &paramItem)
		if err != nil {
			log.Println(err)
		} else {
			//根据service进行不同的回调处理
			log.Printf("service:%v", paramItem)
			//如果没有0和1，那么抛出错误
			//获取结构体的指针
			if len(paramItem.Callback) < 2 {
				//记录日志，不做处理
				log.Printf("common_queue 消息格式有误: %v", paramItem)
			} else {
				p := config[paramItem.Callback[0]]
				f := reflect.ValueOf(p).MethodByName(paramItem.Callback[1])
				ret, _ := json.Marshal(paramItem)
				f.Call([]reflect.Value{reflect.ValueOf(string(ret))})
			}
		}
		msg.Ack(false)
	}
}

func commonDelayCallback(msgs <-chan amqp.Delivery, str string, config map[string]interface{}) {
	for msg := range msgs {
		paramItem := rmqstruct.NewEmptyQueueParamItem()
		err := json.Unmarshal(msg.Body, &paramItem)
		if err != nil {
			log.Println(err)
		} else {
			//根据service进行不同的回调处理
			log.Printf("service:%v", paramItem)
			//如果没有0和1，那么抛出错误
			//获取结构体的指针
			if len(paramItem.Callback) < 2 {
				//记录日志，不做处理
				log.Printf("common_delay_queue 消息格式有误: %v", paramItem)
			} else {
				p := config[paramItem.Callback[0]]
				f := reflect.ValueOf(p).MethodByName(paramItem.Callback[1])
				ret, _ := json.Marshal(paramItem)
				f.Call([]reflect.Value{reflect.ValueOf(string(ret))})
			}
		}
		msg.Ack(false)
	}
}

func commonFanoutCallback(msgs <-chan amqp.Delivery, str string, config map[string]interface{}) {
	for msg := range msgs {
		paramItem := rmqstruct.NewEmptyQueueParamItem()
		err := json.Unmarshal(msg.Body, &paramItem)
		if err != nil {
			log.Println(err)
		} else {
			//根据service进行不同的回调处理
			log.Printf("service:%v", paramItem)
			//如果没有0和1，那么抛出错误
			//获取结构体的指针
			if len(paramItem.Callback) < 2 {
				//记录日志，不做处理
				log.Printf("common_fanout_queue 消息格式有误: %v", paramItem)
			} else {
				p := config[paramItem.Callback[0]]
				f := reflect.ValueOf(p).MethodByName(paramItem.Callback[1])
				ret, _ := json.Marshal(paramItem)
				f.Call([]reflect.Value{reflect.ValueOf(string(ret))})
			}
		}
		msg.Ack(false)
	}
}
